package com.atuguigu.flink.Day04;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;

// 用最简单的方式实现实时TopN
public class Example4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<UserBehavior> pvStream = env
                .readTextFile("E:\\data\\UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] arr = value.split(",");
                        return new UserBehavior(arr[0], arr[1], arr[2], arr[3], Long.parseLong(arr[4]) * 1000L);
                    }
                })
                .filter(r -> r.behaviortype.equals("pv"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                                    @Override
                                    public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        pvStream
                .map(
                        new MapFunction<UserBehavior, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(UserBehavior value) throws Exception {
                                return Tuple2.of("key",value.userId);
                            }
                        }
                )
                .keyBy(r->r.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))
                .process(new ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, String>> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> itemCountMap = new HashMap<>();
                        for(Tuple2<String,String> e:elements ){
                            if(itemCountMap.containsKey(e.f1)){
                                long count = itemCountMap.get(e.f1);
                                itemCountMap.put(e.f1,count+1L);
                            }else {
                                itemCountMap.put(e.f1,1L);
                            }
                        }

                        ArrayList<Tuple2<String ,Long>> mapList = new ArrayList<Tuple2<String ,Long>>();
                       for(String key:itemCountMap.keySet()){
                           mapList.add(Tuple2.of(key,itemCountMap.get(key)));
                       }
                       mapList.sort(new Comparator<Tuple2<String, Long>>() {
                           @Override
                           public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                               return o2.f1.intValue() - o1.f1.intValue();
                           }
                       });

                        StringBuilder result = new StringBuilder();
                        result
                                .append("============================\n");
                        for(int i=0; i<3; i++){
                            Tuple2<String, Long> temp = mapList.get(i);
                            result
                                    .append("浏览量No:" + (i + 1) + "-")
                                    .append("商品id:" + temp.f0 + "-")
                                    .append("浏览量:" + temp.f1 + "-")
                                    .append("窗口结束时间" + new Timestamp(context.window().getEnd()) + "\n");
                        }

                        out.collect(result.toString());

                    }
                })
                .print();


        env.execute();

    }
    //POJO类
    public static  class UserBehavior{
        public String userId;
        public String iteamId;
        public String categoryid;
        public String behaviortype;
        public Long timestamp;

        public UserBehavior() {
        }

        public UserBehavior(String userId, String iteamId, String categoryid, String behaviortype, Long timestamp) {
            this.userId = userId;
            this.iteamId = iteamId;
            this.categoryid = categoryid;
            this.behaviortype = behaviortype;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId='" + userId + '\'' +
                    ", iteamId='" + iteamId + '\'' +
                    ", categoryid='" + categoryid + '\'' +
                    ", behaviortype='" + behaviortype + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}
